#include "config.h"

#include <sys/types.h>
#include <sys/epoll.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <sys/types.h>
#include <libgen.h>
#include <ctype.h>
#include <fcntl.h>
#include <libaio.h>
#include <sqlite3.h>
#include <errno.h>
#include <getopt.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "core.h"
#include "cache.h"
#include "disk.h"
#include "sysy_lib.h"
#include "ynet_rpc.h"
#include "job_dock.h"
#include "net_global.h"
#include "diskmd.h"
#include "bh.h"
#include "tpool.h"
#include "lich_md.h"
#include "configure.h"
#include "dbg.h"
#include "replica.h"
#include "core.h"
#include "coroutine.h"
#include "fnotify.h"
#include "locator_rpc.h"

typedef struct {
        co_fork_hook_t fork_hook;

        int chknum;
        int chkidx;
        chkid_t *chkids;
        diskloc_t *locs;

        const char *pool;
        const chkid_t *parent;
        int priority;
        uint64_t meta_version;
        uint64_t fingerprint;
        int flag;
} db_create_ctx_t;

int disk_sqlite3_create(const char *pool, const chkid_t *chkid,
                        const diskloc_t *loc, int chknum,
                        const chkid_t *parent,
                        uint64_t meta_version, int flag);


static void __db_create_ctx_free(db_create_ctx_t *ctxs, int count)
{
        int i;
        db_create_ctx_t *ctx;

        for (i=0; i < count; i++) {
                ctx = &ctxs[i];
                if (ctx->chkids)
                        yfree((void **)&ctx->chkids);

                if (ctx->locs)
                        yfree((void **)&ctx->locs);
        }
}

/**
 * 按db分组，每个task负责一个db，组内批量提交
 *
 * @param ctxs
 * @param count
 * @param db_count OUT
 * @param chkids
 * @param locs
 * @param chknum
 * @return
 */
static int __db_create_ctx_group_by(db_create_ctx_t *ctxs, int count, int *db_count,
                                  const chkid_t *chkids, const diskloc_t *locs, int chknum)
{
        int ret, i, db;
        db_create_ctx_t *ctx;

        *db_count = 0;

        for (i=0; i < chknum; i++) {
                db = __DISK_DB_HASH__(&chkids[i]);
                ctxs[db].chknum++;
        }

        for (i=0; i < __DB_HASH__; i++) {
                ctx = &ctxs[i];
                if (ctx->chknum == 0)
                        continue;

                // 共映射到多少db
                (*db_count)++;

                // 分配所需空间
                ret = ymalloc((void **)&ctx->chkids, sizeof(chkid_t) * ctx->chknum);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                ret = ymalloc((void **)&ctx->locs, sizeof(diskloc_t) * ctx->chknum);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        // group by chkid
        for (i=0; i < chknum; i++) {
                db = __DISK_DB_HASH__(&chkids[i]);
                ctx = &ctxs[db];
                ctx->chkids[ctx->chkidx] = chkids[i];
                ctx->locs[ctx->chkidx] = locs[i];
                ctx->chkidx++;
        }

        return 0;
err_ret:
        __db_create_ctx_free(ctxs, count);
        return ret;
}

static void __db_create_multi(void *arg)
{
        int ret;
        db_create_ctx_t *ctx = arg;

        ret = disk_sqlite3_create(ctx->pool, ctx->chkids, ctx->locs, ctx->chknum, ctx->parent,
                                  ctx->meta_version, ctx->flag);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        co_fork_return(ctx->fork_hook.fork, ctx->fork_hook.idx, 0);
        return;
err_ret:
        co_fork_return(ctx->fork_hook.fork, ctx->fork_hook.idx, ret);
}

int disk_sqlite3_create_multi(const char *pool, const chkid_t *chkid, const diskloc_t *locs, int chknum,
                              const chkid_t *parent,
                              uint64_t meta_version, int flag)
{
        int ret, i, db_num = 0;
        db_create_ctx_t ctxs[__DB_HASH__], *ctx;
        co_fork_t *fork;

        memset(&ctxs, 0x0, sizeof(ctxs));

        ret = __db_create_ctx_group_by(ctxs, __DB_HASH__, &db_num, chkid, locs, chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);  

        YASSERT(db_num > 0);

        // 传入总的任务数
        ret = co_fork_create(&fork, __FUNCTION__, db_num);
        if (unlikely(ret))
                GOTO(err_ctx, ret);

        for (i=0; i < __DB_HASH__; i++) {
                ctx = &ctxs[i];
                if (ctx->chknum == 0)
                        continue;

                YASSERT(ctx->chkids && ctx->locs);

                ctx->pool = pool;
                ctx->parent = parent;
                ctx->priority = -1;
                ctx->meta_version = meta_version;
                ctx->flag = flag;

                // schedule_task_new(__FUNCTION__, __db_create_multi, ctx, -1);

                // fork new task
                co_fork_add(fork, __db_create_multi, ctx);
        }

        // wait here, 检查返回值
        ret = co_fork_join(fork);
        if (unlikely(ret))
                GOTO(err_fork, ret);

        yfree((void **)&fork);
        __db_create_ctx_free(ctxs, __DB_HASH__);
        return 0;
err_fork:
        yfree((void **)&fork);
err_ctx:
        __db_create_ctx_free(ctxs, __DB_HASH__);
err_ret:
        return ret;
}
